iT邦幫忙

2023 iThome 鐵人賽

DAY 26
0

看完了如何建立一個接收 webSocket 請求的 server 之後

今天,我們來看看如果將 Ktor 當作 Client 端

要透過 webSocket 持續發送資料時,該怎麼撰寫

以及框架是怎麼實作的

首先我們看看官網指導的寫法

前面我們要在 build.gradle.kts 內加上

tasks.named<JavaExec>("run") {
    standardInput = System.`in`
}

接著在程式內的寫法下如下

val client = HttpClient {
	install(WebSockets)
}
runBlocking {
	client.webSocket(method = HttpMethod.Get, host = "127.0.0.1", port = 8080, path = "/chat") {
		while (true) {
			val othersMessage = incoming.receive() as? Frame.Text ?: continue
			println(othersMessage.readText())
			val myMessage = readlnOrNull()
			if (myMessage != null) {
				send(myMessage)
			}
		}
	}
}
client.close()
println("Connection closed. Goodbye!")

由於 Ktor 的程式許多時候會定義成 suspend 來允許協程切換

所以這邊我們將整段運作的邏輯放在 runBlocking 內,變成同步執行

runBlockingclient.webSocket 在這邊的實作如下

/**
 * Opens a [block] with [DefaultClientWebSocketSession].
 */
public suspend fun HttpClient.webSocket(
    method: HttpMethod = HttpMethod.Get,
    host: String? = null,
    port: Int? = null,
    path: String? = null,
    request: HttpRequestBuilder.() -> Unit = {},
    block: suspend DefaultClientWebSocketSession.() -> Unit
) {
    webSocket(
        {
            this.method = method
            url("ws", host, port, path)
            request()
        },
        block
    )
}

HttpClient.webSocket(request, block) 則是

/**
 * Opens a [block] with [DefaultClientWebSocketSession].
 */
public suspend fun HttpClient.webSocket(
    request: HttpRequestBuilder.() -> Unit,
    block: suspend DefaultClientWebSocketSession.() -> Unit
) {
    plugin(WebSockets)
    val session = prepareRequest {
        url {
            protocol = URLProtocol.WS
        }
        request()
    }

    session.body<DefaultClientWebSocketSession, Unit> {
        try {
            block(it)
        } finally {
            it.close()
        }
    }
}

URLProtocol.WS 相關的定義如下

/**
 * Represents URL protocol
 * @property name of protocol (schema)
 * @property defaultPort default port for protocol or `-1` if not known
 */
public data class URLProtocol(val name: String, val defaultPort: Int) {
    init {
        require(name.all { it.isLowerCase() }) { "All characters should be lower case" }
    }

    @Suppress("PublicApiImplicitType")
    public companion object {
        /**
         * HTTP with port 80
         */
        public val HTTP: URLProtocol = URLProtocol("http", 80)

        /**
         * secure HTTPS with port 443
         */
        public val HTTPS: URLProtocol = URLProtocol("https", 443)

        /**
         * Web socket over HTTP on port 80
         */
        public val WS: URLProtocol = URLProtocol("ws", 80)

        /**
         * Web socket over secure HTTPS on port 443
         */
        public val WSS: URLProtocol = URLProtocol("wss", 443)

        /**
         * Socks proxy url protocol.
         */
        public val SOCKS: URLProtocol = URLProtocol("socks", 1080)

        /**
         * Protocols by names map
         */
        public val byName: Map<String, URLProtocol> = listOf(HTTP, HTTPS, WS, WSS, SOCKS).associateBy { it.name }

        /**
         * Create an instance by [name] or use already existing instance
         */
        public fun createOrDefault(name: String): URLProtocol = name.toLowerCasePreservingASCIIRules().let {
            byName[it] ?: URLProtocol(it, DEFAULT_PORT)
        }
    }
}

可以看到除了 WS 以外,還定義了許多的 Protocol

確定好 URLProtocol 之後,後續的動作就跟其他的 HttpRequest 差不多

下一段是一個無窮迴圈

while (true) {
	val othersMessage = incoming.receive() as? Frame.Text ?: continue
}

這邊的 incoming 是一個 ReceiveChannel

/**
 * An incoming frames channel.
 * Note that if you use `webSocket` to handle a WebSockets session,
 * the incoming channel doesn't contain control frames such as the ping/pong or close frames.
 * If you need control over control frames, use the `webSocketRaw` function.
 */
public val incoming: ReceiveChannel<Frame>

ReceiveChannel.receive 定義如下

/**
 * Retrieves and removes an element from this channel if it's not empty, or suspends the caller while the channel is empty,
 * or throws a [ClosedReceiveChannelException] if the channel [is closed for `receive`][isClosedForReceive].
 * If the channel was closed because of an exception, it is called a _failed_ channel and this function
 * will throw the original [close][SendChannel.close] cause exception.
 *
 * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
 * function is suspended, this function immediately resumes with a [CancellationException].
 * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
 * suspended, it will not resume successfully. The `receive` call can retrieve the element from the channel,
 * but then throw [CancellationException], thus failing to deliver the element.
 * See "Undelivered elements" section in [Channel] documentation for details on handling undelivered elements.
 *
 * Note that this function does not check for cancellation when it is not suspended.
 * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
 *
 * This function can be used in [select] invocations with the [onReceive] clause.
 * Use [tryReceive] to try receiving from this channel without waiting.
 */
public suspend fun receive(): E

收到內容之後,會試圖將內容轉換成 Frame.Text

如果失敗的話就 continue

收到 Frame.Text 之後,我們就可以針對 othersMessage 進行操作

println(othersMessage.readText())

Frame.Text.readText() 之前看過了

/**
 * Reads text content from the text frame.
 * Shouldn't be used for fragmented frames: such frames need to be reassembled first.
 */
public fun Frame.Text.readText(): String {
    require(fin) { "Text could be only extracted from non-fragmented frame" }
    return Charsets.UTF_8.newDecoder().decode(buildPacket { writeFully(data) })
}

接著是一個有趣的段落

val myMessage = readlnOrNull()
if (myMessage != null) {
	send(myMessage)
}

這邊是從 System.in 讀取內容的,不過 Ktor 怎麼知道我們會從這邊送入資料呢?

這就回到前面

tasks.named<JavaExec>("run") {
    standardInput = System.`in`
}

我們在 build.gradle.kts 定義的 task

讓我們可以在 gradle run 的時候,持續接收 System.in 的內容

每次收到內容時,我們就執行 send(myMessage)

/**
 * Enqueues a text frame for sending with the specified [content].
 *
 * May suspend if the outgoing queue is full, and throw an exception if the channel is already closed.
 */
public suspend fun WebSocketSession.send(content: String): Unit = send(Frame.Text(content))
/**
 * Enqueue a frame, may suspend if an outgoing queue is full. May throw an exception if the
 * outgoing channel is already closed, so it is impossible to transfer any message.
 * Frames that were sent after close frame could be silently ignored.
 * Note that a close frame could be sent automatically in reply to a peer's close frame unless it is
 * raw WebSocket session.
 */
public suspend fun send(frame: Frame) {
	outgoing.send(frame)
}

到今天,我們就看過了將 Ktor 當作 Client 透過 webSocket 發送資料的邏輯


上一篇
Day 25:定義自己的 Connection 以及存取 DefaultWebSocketSession
下一篇
Day 27:利用 createApplicationPlugin 定義客製化套件
系列文
深入解析 Kotlin 專案 Ktor 的程式碼,探索 Ktor 的強大功能30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言